package com.raylu.day07;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;

public class Example4 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env
                .addSource(new SourceFunction<Integer>() {
                    @Override
                    public void run(SourceContext<Integer> ctx) throws Exception {
                        ctx.collectWithTimestamp(1,1000L);
                    }

                    @Override
                    public void cancel() {

                    }
                })
                .keyBy(r -> 1)
                .process(new KeyedProcessFunction<Integer, Integer, String>() {
                    @Override
                    public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception {
                        ctx.timerService().registerEventTimeTimer(Long.MAX_VALUE);
                    }

                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        super.onTimer(timestamp, ctx, out);
                        out.collect(ctx.timerService().currentWatermark() + "");
                    }
                })
                .print();

        env.execute();
    }
}
